package org.codehaus.activemq.transport;

import EDU.oswego.cs.dl.util.concurrent.CopyOnWriteArrayList;
import EDU.oswego.cs.dl.util.concurrent.Executor;
import java.net.URI;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.codehaus.activemq.message.Packet;
import org.codehaus.activemq.message.PacketListener;
import org.codehaus.activemq.message.Receipt;
import org.codehaus.activemq.message.ReceiptHolder;
import org.codehaus.activemq.util.ExecutorHelper;

public abstract class AbstractTransportChannel
  implements TransportChannel
{
  private static final Log log = LogFactory.getLog(AbstractTransportChannel.class);

  private CopyOnWriteArrayList listeners = new CopyOnWriteArrayList();
  private HashMap requestMap = new HashMap();
  private PacketListener packetListener;
  private ExceptionListener exceptionListener;
  private String clientID;
  private TransportChannelListener transportChannelListener;

  public void stop()
  {
    Map map = (Map)this.requestMap.clone();
    for (Iterator i = map.values().iterator(); i.hasNext(); ) {
      ReceiptHolder rh = (ReceiptHolder)i.next();
      rh.close();
    }
    map.clear();
    this.requestMap.clear();
    if (this.transportChannelListener != null) {
      this.transportChannelListener.removeClient(this);
    }
    this.exceptionListener = null;
    this.packetListener = null;
  }

  public Receipt send(Packet packet)
    throws JMSException
  {
    return send(packet, 0);
  }

  public Receipt send(Packet packet, int timeout)
    throws JMSException
  {
    ReceiptHolder rh = new ReceiptHolder();
    this.requestMap.put(packet.getId(), rh);
    doAsyncSend(packet);
    Receipt result = rh.getReceipt(timeout);
    return result;
  }

  public TransportChannelListener getTransportChannelListener()
  {
    return this.transportChannelListener;
  }

  public void setTransportChannelListener(TransportChannelListener transportChannelListener) {
    this.transportChannelListener = transportChannelListener;
  }

  public void addTransportStatusEventListener(TransportStatusEventListener listener)
  {
    this.listeners.add(listener);
  }

  public void removeTransportStatusEventListener(TransportStatusEventListener listener)
  {
    this.listeners.remove(listener);
  }

  public String getClientID() {
    return this.clientID;
  }

  public void setClientID(String clientID) {
    this.clientID = clientID;
  }

  public ExceptionListener getExceptionListener() {
    return this.exceptionListener;
  }

  public PacketListener getPacketListener() {
    return this.packetListener;
  }

  public void setPacketListener(PacketListener l)
  {
    this.packetListener = l;
  }

  public void setExceptionListener(ExceptionListener listener)
  {
    this.exceptionListener = listener;
  }

  protected void doConsumePacket(Packet packet)
  {
    if (!doHandleReceipt(packet))
      if (this.packetListener != null) {
        this.packetListener.consume(packet);
      }
      else
        log.warn("No packet listener set to receive packets");
  }

  protected boolean doHandleReceipt(Packet packet)
  {
    boolean result = false;
    if ((packet != null) && 
      (packet.isReceipt())) {
      result = true;
      Receipt receipt = (Receipt)packet;
      ReceiptHolder rh = (ReceiptHolder)this.requestMap.remove(receipt.getCorrelationId());
      if (rh != null) {
        rh.setReceipt(receipt);
      }
      else {
        log.warn("No Packet found to match Receipt correlationId: " + receipt.getCorrelationId());
      }
    }

    return result;
  }

  protected void doAsyncSend(Packet packet)
    throws JMSException
  {
    asyncSend(packet);
  }

  protected void onAsyncException(JMSException e)
  {
    if (this.exceptionListener != null) {
      this.exceptionListener.onException(e);
    }
    else
      log.warn("Caught exception dispatching message and no ExceptionListener registered: " + e, e);
  }

  protected void fireStatusEvent(URI remoteURI, int status)
  {
    TransportStatusEvent event = new TransportStatusEvent();
    event.setChannelStatus(status);
    event.setRemoteURI(remoteURI);
    fireStatusEvent(event);
  }

  protected void fireStatusEvent(TransportStatusEvent event)
  {
    Iterator i;
    if (event != null)
      for (i = this.listeners.iterator(); i.hasNext(); ) {
        TransportStatusEventListener l = (TransportStatusEventListener)i.next();
        l.statusChanged(event);
      }
  }

  protected void stopExecutor(Executor executor)
    throws InterruptedException, JMSException
  {
    ExecutorHelper.stopExecutor(executor);
  }
}